# Copyright 2020-2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoModelForSequenceClassification, AutoTokenizer
from transformers.utils import is_peft_available

from trl.experimental.xpo import XPOConfig, XPOTrainer

from ..testing_utils import RandomPairwiseJudge, TrlTestCase, require_llm_blender, require_peft


if is_peft_available():
    from peft import LoraConfig, get_peft_model


@pytest.mark.low_priority
class TestXPOTrainer(TrlTestCase):
    def setup_method(self):
        self.model_id = "trl-internal-testing/tiny-Qwen2ForCausalLM-2.5"
        self.model = AutoModelForCausalLM.from_pretrained(self.model_id)
        self.ref_model = AutoModelForCausalLM.from_pretrained(self.model_id)
        self.reward_model = AutoModelForSequenceClassification.from_pretrained(self.model_id, num_labels=1)
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)
        self.tokenizer.pad_token = self.tokenizer.eos_token

    @pytest.mark.parametrize("config_name", ["standard_prompt_only", "conversational_prompt_only"])
    def test_xpo_trainer_training(self, config_name):
        training_args = XPOConfig(
            output_dir=self.tmp_dir,
            per_device_train_batch_size=2,
            max_steps=3,
            remove_unused_columns=False,
            gradient_accumulation_steps=1,
            learning_rate=9e-1,
            eval_strategy="steps",
            report_to="none",
        )
        dummy_dataset = load_dataset("trl-internal-testing/zen", config_name)

        trainer = XPOTrainer(
            model=self.model,
            ref_model=self.ref_model,
            reward_funcs=self.reward_model,
            args=training_args,
            processing_class=self.tokenizer,
            train_dataset=dummy_dataset["train"],
            eval_dataset=dummy_dataset["test"],
        )

        trainer.train()

        # Check if training loss is available
        assert "train_loss" in trainer.state.log_history[-1]

    @require_peft
    def test_training_with_peft(self):
        lora_config = LoraConfig(r=16, lora_alpha=32, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM")
        training_args = XPOConfig(
            output_dir=self.tmp_dir,
            per_device_train_batch_size=2,
            max_steps=3,
            learning_rate=5.0e-7,
            eval_strategy="steps",
            report_to="none",
        )
        dummy_dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only")

        trainer = XPOTrainer(
            model=self.model,
            reward_funcs=self.reward_model,
            args=training_args,
            processing_class=self.tokenizer,
            train_dataset=dummy_dataset["train"],
            eval_dataset=dummy_dataset["test"],
            peft_config=lora_config,
        )

        trainer.train()

        # Check if training loss is available
        assert "train_loss" in trainer.state.log_history[-1]

    @require_peft
    def test_training_with_peft_and_ref_model(self):
        lora_config = LoraConfig(r=16, lora_alpha=32, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM")
        training_args = XPOConfig(
            output_dir=self.tmp_dir,
            per_device_train_batch_size=2,
            max_steps=3,
            learning_rate=5.0e-7,
            eval_strategy="steps",
            report_to="none",
        )
        dummy_dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only")

        trainer = XPOTrainer(
            model=self.model,
            ref_model=self.ref_model,
            reward_funcs=self.reward_model,
            args=training_args,
            processing_class=self.tokenizer,
            train_dataset=dummy_dataset["train"],
            eval_dataset=dummy_dataset["test"],
            peft_config=lora_config,
        )

        trainer.train()

        # Check if training loss is available
        assert "train_loss" in trainer.state.log_history[-1]

    @require_peft
    def test_training_with_peft_model_and_peft_config(self):
        model_lora_config = LoraConfig(r=8, lora_alpha=16, lora_dropout=0.1, bias="none", task_type="CAUSAL_LM")
        model = get_peft_model(self.model, model_lora_config)
        # we want only the "train adapter" to be trained
        lora_train_config = LoraConfig(r=16, lora_alpha=32, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM")
        training_args = XPOConfig(
            output_dir=self.tmp_dir,
            per_device_train_batch_size=2,
            max_steps=3,
            learning_rate=5.0e-7,
            eval_strategy="steps",
            report_to="none",
        )
        dummy_dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only")

        trainer = XPOTrainer(
            model=model,
            reward_funcs=self.reward_model,
            args=training_args,
            processing_class=self.tokenizer,
            train_dataset=dummy_dataset["train"],
            eval_dataset=dummy_dataset["test"],
            peft_config=lora_train_config,
        )

        trainer.train()

        # Check if training loss is available
        assert "train_loss" in trainer.state.log_history[-1]

    @require_peft
    def test_training_pre_pefted_model_implicit_ref(self):
        lora_config = LoraConfig(r=8, lora_alpha=16, lora_dropout=0.1, bias="none", task_type="CAUSAL_LM")
        peft_model_instance = get_peft_model(self.model, lora_config)

        training_args = XPOConfig(
            output_dir=self.tmp_dir,
            per_device_train_batch_size=1,
            max_steps=2,
            learning_rate=5.0e-7,
            eval_strategy="no",
            report_to="none",
            remove_unused_columns=False,
        )
        dummy_dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only")["train"]

        trainer = XPOTrainer(
            model=peft_model_instance,
            ref_model=None,
            reward_funcs=self.reward_model,  # Using reward_model to ensure _generate_completions is used as expected
            args=training_args,
            processing_class=self.tokenizer,
            train_dataset=dummy_dataset,
        )

        trainer.train()

        assert "train_loss" in trainer.state.log_history[-1]

    @pytest.mark.parametrize("config_name", ["standard_prompt_only", "conversational_prompt_only"])
    @require_llm_blender
    def test_xpo_trainer_judge_training(self, config_name):
        training_args = XPOConfig(
            output_dir=self.tmp_dir,
            per_device_train_batch_size=2,
            max_steps=3,
            remove_unused_columns=False,
            gradient_accumulation_steps=1,
            learning_rate=9e-1,
            eval_strategy="steps",
            report_to="none",
        )
        dummy_dataset = load_dataset("trl-internal-testing/zen", config_name)
        judge = RandomPairwiseJudge()

        trainer = XPOTrainer(
            model=self.model,
            ref_model=self.ref_model,
            judge=judge,
            args=training_args,
            processing_class=self.tokenizer,
            train_dataset=dummy_dataset["train"],
            eval_dataset=dummy_dataset["test"],
        )

        trainer.train()

        # Check if training loss is available
        assert "train_loss" in trainer.state.log_history[-1]
